//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// buffer_pool_manager.cpp
//
// Identification: src/buffer/buffer_pool_manager.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include "common/logger.h"

#include "buffer/buffer_pool_manager.h"

#include "common/exception.h"
#include "common/macros.h"
#include "storage/page/page_guard.h"

namespace bustub {

BufferPoolManager::BufferPoolManager(size_t pool_size, DiskManager *disk_manager, size_t replacer_k,
                                     LogManager *log_manager)
    : pool_size_(pool_size), disk_manager_(disk_manager), log_manager_(log_manager) {
  // we allocate a consecutive memory space for the buffer pool
  pages_ = new Page[pool_size_];
  replacer_ = std::make_unique<LRUKReplacer>(pool_size, replacer_k);

  // Initially, every page is in the free list.
  for (size_t i = 0; i < pool_size_; ++i) {
    free_list_.emplace_back(static_cast<int>(i));
  }
  LOG_DEBUG("pool_size_=%ld, replacer_k=%ld", pool_size_, replacer_k);
}

BufferPoolManager::~BufferPoolManager() { delete[] pages_; }

auto BufferPoolManager::NewPage(page_id_t *page_id) -> Page * {
  std::lock_guard<std::mutex> lk(latch_);

  frame_id_t frame_id = 0;
  Page *p = nullptr;
  if (!free_list_.empty()) {
    frame_id = free_list_.back();
    free_list_.pop_back();
    p = pages_ + frame_id;
  } else {
    // 逐旧
    if (!replacer_->Evict(&frame_id)) {
      return nullptr;
    }
    // 刷脏
    p = pages_ + frame_id;
    if (p->IsDirty()) {
      disk_manager_->WritePage(p->GetPageId(), p->GetData());
      p->is_dirty_ = false;
    }
    p->pin_count_ = 0;
  }

  // 更新访问记录
  replacer_->RecordAccess(frame_id);
  replacer_->SetEvictable(frame_id, false);

  *page_id = AllocatePage();

  // 换页
  page_table_.erase(p->GetPageId());
  page_table_[*page_id] = frame_id;
  p->page_id_ = *page_id;
  p->pin_count_++;

  return p;
}

auto BufferPoolManager::FetchPage1(page_id_t page_id, bool read_page, [[maybe_unused]] AccessType access_type)
    -> Page * {
  frame_id_t frame_id = 0;
  Page *p = nullptr;
  if (!free_list_.empty()) {
    frame_id = free_list_.back();
    free_list_.pop_back();
    p = pages_ + frame_id;
    // LOG_DEBUG("page_id=%d, frame_id=%d, pin_count_=%d", page_id, frame_id, p->pin_count_);
  } else {
    // 逐旧
    if (!replacer_->Evict(&frame_id)) {
      return nullptr;
    }
    // 刷脏
    p = pages_ + frame_id;
    // LOG_DEBUG("page_id=%d, frame_id=%d, pin_count_=%d, is_dirty=%d", page_id, frame_id, p->pin_count_, p->IsDirty());
    if (p->IsDirty()) {
      disk_manager_->WritePage(p->GetPageId(), p->GetData());
      p->is_dirty_ = false;
    }
    p->pin_count_ = 0;
  }

  // 更新访问记录
  replacer_->RecordAccess(frame_id);
  replacer_->SetEvictable(frame_id, false);

  // 换页
  page_table_.erase(p->GetPageId());
  page_table_[page_id] = frame_id;
  p->page_id_ = page_id;
  p->pin_count_++;

  // 读盘
  if (read_page) {
    p->ResetMemory();
    disk_manager_->ReadPage(page_id, p->GetData());
  }

  return p;
}

auto BufferPoolManager::FetchPage(page_id_t page_id, [[maybe_unused]] AccessType access_type) -> Page * {
  std::lock_guard<std::mutex> lk(latch_);
  // LOG_DEBUG("page_id=%d", page_id);
  if (page_id < 0) {
    return nullptr;
  }
  // LOG_DEBUG("page_id=%d", page_id);
  auto it = page_table_.find(page_id);
  if (it != page_table_.end()) {
    frame_id_t frame_id = it->second;
    // c. 通过指针运算获取 frame_id 处存放的 Page 结构体
    Page *p = pages_ + frame_id;
    // LOG_DEBUG("page_id=%d, frame_id=%d, pin_count_=%d", page_id, frame_id, p->pin_count_);
    p->pin_count_++;
    replacer_->SetEvictable(frame_id, false);  // default
    replacer_->RecordAccess(frame_id);         // d. 将对应 page 从“回收站”中捞出
    return p;
  }
  // LOG_DEBUG("page_id=%d", page_id);
  return FetchPage1(page_id, true);
}

auto BufferPoolManager::UnpinPage(page_id_t page_id, bool is_dirty, [[maybe_unused]] AccessType access_type) -> bool {
  std::lock_guard<std::mutex> lk(latch_);
  // LOG_DEBUG("page_id=%d, is_dirty=%d", page_id, is_dirty);
  auto it = page_table_.find(page_id);
  if (it == page_table_.end()) {
    return false;
  }
  // LOG_DEBUG("page_id=%d, is_dirty=%d", page_id, is_dirty);
  auto frame_id = page_table_[page_id];
  auto p = pages_ + frame_id;
  if (p->GetPinCount() <= 0) {
    return false;
  }
  // LOG_DEBUG("page_id=%d, is_dirty=%d", page_id, is_dirty);
  p->pin_count_--;
  if (!p->is_dirty_) {
    p->is_dirty_ = is_dirty;
  }
  if (p->GetPinCount() == 0) {
    replacer_->SetEvictable(frame_id, true);
  }
  return true;
}

auto BufferPoolManager::FlushPage(page_id_t page_id) -> bool {
  std::lock_guard<std::mutex> lk(latch_);
  // LOG_DEBUG("page_id=%d", page_id);
  auto it = page_table_.find(page_id);
  if (it == page_table_.end()) {
    return false;
  }
  // LOG_DEBUG("page_id=%d", page_id);
  auto frame_id = page_table_[page_id];
  auto p = pages_ + frame_id;
  disk_manager_->WritePage(page_id, p->GetData());
  p->is_dirty_ = false;
  return true;
}

void BufferPoolManager::FlushAllPages() {
  std::lock_guard<std::mutex> lk(latch_);
  for (page_id_t i = 0; i <= next_page_id_; ++i) {
    auto frame_id = page_table_[i];
    auto p = pages_ + frame_id;
    disk_manager_->WritePage(i, p->GetData());
    p->is_dirty_ = false;
  }
}

auto BufferPoolManager::DeletePage(page_id_t page_id) -> bool {
  std::lock_guard<std::mutex> lk(latch_);
  // LOG_DEBUG("page_id=%d", page_id);
  auto it = page_table_.find(page_id);
  if (it == page_table_.end()) {
    return true;
  }
  // LOG_DEBUG("page_id=%d", page_id);
  auto frame_id = page_table_[page_id];
  auto p = pages_ + frame_id;
  if (p->GetPinCount() > 0) {
    return false;
  }
  // LOG_DEBUG("page_id=%d", page_id);
  replacer_->Remove(frame_id);
  page_table_.erase(page_id);
  free_list_.push_back(frame_id);
  p->ResetMemory();
  p->is_dirty_ = false;
  DeallocatePage(page_id);
  return true;
}

auto BufferPoolManager::AllocatePage() -> page_id_t { return next_page_id_++; }

auto BufferPoolManager::FetchPageBasic(page_id_t page_id) -> BasicPageGuard { return {this, FetchPage(page_id)}; }

auto BufferPoolManager::FetchPageRead(page_id_t page_id) -> ReadPageGuard {
  auto *p = FetchPage(page_id);
  if (p != nullptr) {
    p->RLatch();
  }
  return ReadPageGuard{this, p};
}

auto BufferPoolManager::FetchPageWrite(page_id_t page_id) -> WritePageGuard {
  auto *p = FetchPage(page_id);
  if (p != nullptr) {
    p->WLatch();
  }
  return WritePageGuard{this, p};
}

auto BufferPoolManager::NewPageGuarded(page_id_t *page_id) -> BasicPageGuard { return {this, NewPage(page_id)}; }
auto BufferPoolManager::NewPageGuardedWrite(page_id_t *page_id) -> WritePageGuard {
  auto p = NewPage(page_id);
  p->WLatch();
  return WritePageGuard{this, p};
}

}  // namespace bustub
